Windows IOCP 示例
这个示将接收到的数据原样返回给客户端:
1#include <WinSock2.h>
2#include <WS2tcpip.h>
3#include <MSWSock.h>
4#include <cstdio>
5
6#define BUFFER_SIZE 1024
7
8enum class IocpType
9{
10 ACCEPT,
11 RECV,
12 SEND
13};
14
15struct IocpContext : public WSAOVERLAPPED
16{
17 IocpContext() :
18 WSAOVERLAPPED{}
19 {
20
21 }
22
23 WSABUF wsaBuf{BUFFER_SIZE, buffer};
24 char buffer[BUFFER_SIZE];
25 SOCKET sock;
26 IocpType type;
27};
28
29// 提交一个异步的 accept 操作
30bool postAccpet(SOCKET server)
31{
32 // 创建上下文
33 IocpContext* ctx = new IocpContext;
34 ctx->type = IocpType::ACCEPT;
35
36 // 创建接收连接的socket
37 ctx->sock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, WSA_FLAG_OVERLAPPED);
38 if (ctx->sock == SOCKET_ERROR)
39 {
40 fprintf(stderr, "WSASocketW: %s\n", strerror(WSAGetLastError()));
41 closesocket(ctx->sock);
42 return false;
43 }
44
45 // 加载 AcceptEX 函数
46 LPFN_ACCEPTEX lpfnAcceptEx = NULL;
47 GUID GuidAcceptEx = WSAID_ACCEPTEX;
48 DWORD dwBytes;
49 {
50 int ret = WSAIoctl(server, SIO_GET_EXTENSION_FUNCTION_POINTER,
51 &GuidAcceptEx, sizeof (GuidAcceptEx),
52 &lpfnAcceptEx, sizeof (lpfnAcceptEx),
53 &dwBytes, NULL, NULL);
54 if (ret == SOCKET_ERROR)
55 {
56 fprintf(stderr, "WSAIoctl: %s\n", strerror(WSAGetLastError()));
57 closesocket(ctx->sock);
58 return false;
59 }
60 }
61
62 // 通过 AcceptEx 发起异步的 accept 操作
63 {
64 DWORD addrlen = sizeof(struct sockaddr_in);
65 DWORD recvlen;
66 BOOL ret = lpfnAcceptEx(server,
67 ctx->sock,
68 ctx->buffer,
69 BUFFER_SIZE - 2*(addrlen+16),
70 addrlen + 16,
71 addrlen + 16,
72 &recvlen,
73 ctx);
74 if (!ret && WSAGetLastError() != ERROR_IO_PENDING)
75 {
76 fprintf(stderr, "AcceptEx: %s\n", strerror(WSAGetLastError()));
77 closesocket(ctx->sock);
78 return false;
79 }
80 }
81
82 return true;
83}
84
85// 提交一个异步的 RECV 操作
86bool postRecv(SOCKET sock)
87{
88 IocpContext* ctx = new IocpContext;
89 ctx->sock = sock;
90 ctx->type = IocpType::RECV;
91 DWORD nBytes = BUFFER_SIZE;
92 DWORD flags = 0;
93 int ret = WSARecv(sock, &(ctx->wsaBuf), 1, &nBytes, &flags, ctx, nullptr);
94 if (ret == SOCKET_ERROR && WSAGetLastError() != ERROR_IO_PENDING)
95 {
96 fprintf(stderr, "WSARecv: %s\n", strerror(WSAGetLastError()));
97 return false;
98 }
99
100 return true;
101}
102
103// 提交一个异步的 SEND 操作
104bool postSend(SOCKET sock, const char* data, DWORD size)
105{
106 IocpContext* ctx = new IocpContext;
107 ctx->sock = sock;
108 ctx->type = IocpType::SEND;
109 memcpy(ctx->buffer, data, size);
110 DWORD nBytes = size;
111 ctx->wsaBuf.len = size;
112 DWORD flags = 0;
113 int ret = WSASend(sock, &(ctx->wsaBuf), 1, &nBytes, flags, ctx, nullptr);
114 if (ret == SOCKET_ERROR && WSAGetLastError() != ERROR_IO_PENDING)
115 {
116 fprintf(stderr, "WSASend: %s\n", strerror(WSAGetLastError()));
117 return false;
118 }
119
120 return true;
121}
122
123int main()
124{
125 WSAData wsa;
126 if (WSAStartup(0x202, &wsa) != NO_ERROR)
127 {
128 fprintf(stderr, "WSAStartup failed\n");
129 return EXIT_FAILURE;
130 }
131
132 // 创建服务socket
133 SOCKET server = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, nullptr, 0, WSA_FLAG_OVERLAPPED);
134 if (server == INVALID_SOCKET)
135 {
136 fprintf(stderr, "WSASocketW: %s\n", strerror(WSAGetLastError()));
137 WSACleanup();
138 return EXIT_FAILURE;
139 }
140
141 // 设为非阻塞
142 unsigned long value = 1;
143 if (ioctlsocket(server, FIONBIO, &value) == SOCKET_ERROR)
144 {
145 fprintf(stderr, "ioctlsocket: %s\n", strerror(WSAGetLastError()));
146 closesocket(server);
147 WSACleanup();
148 return EXIT_FAILURE;
149 }
150
151 // 绑定端口
152 struct sockaddr_in address {};
153 address.sin_family = AF_INET;
154 address.sin_port = htons(8080);
155 inet_pton(AF_INET, "127.0.0.1", &address.sin_addr);
156 if (bind(server, (const sockaddr*)(&address), sizeof(address)) == SOCKET_ERROR)
157 {
158 fprintf(stderr, "bind: %s\n", strerror(WSAGetLastError()));
159 closesocket(server);
160 WSACleanup();
161 return EXIT_FAILURE;
162 }
163
164 // 监听
165 if (listen(server, SOMAXCONN) == SOCKET_ERROR)
166 {
167 fprintf(stderr, "listen: %s\n", strerror(WSAGetLastError()));
168 closesocket(server);
169 WSACleanup();
170 return EXIT_FAILURE;
171 }
172
173 // 创建 IOCP handle
174 HANDLE handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
175 if (handle == INVALID_HANDLE_VALUE)
176 {
177 fprintf(stderr, "CreateIoCompletionPort: %s\n", strerror(WSAGetLastError()));
178 closesocket(server);
179 WSACleanup();
180 return EXIT_FAILURE;
181 }
182
183 // 将 server 绑定到 IOCP 上
184 if (CreateIoCompletionPort(reinterpret_cast<HANDLE>(server), handle, 0, 0) == nullptr)
185 {
186 fprintf(stderr, "CreateIoCompletionPort: %s\n", strerror(WSAGetLastError()));
187 closesocket(server);
188 WSACleanup();
189 return EXIT_FAILURE;
190 }
191
192 // 提交一个异步的 accept 操作
193 if (postAccpet(server) == false)
194 {
195 closesocket(server);
196 WSACleanup();
197 return EXIT_FAILURE;
198 }
199
200 while (true)
201 {
202 // 等待操作完成
203 IocpContext* ctx = nullptr;
204 DWORD lpNumberOfBytesTransferred = 0;
205 void* lpCompletionKey = nullptr;
206 {
207 BOOL ret = GetQueuedCompletionStatus(
208 handle,
209 &lpNumberOfBytesTransferred,
210 (PULONG_PTR) &lpCompletionKey,
211 (LPOVERLAPPED *) &ctx,
212 INFINITE);
213 if (!ret)
214 continue;
215 }
216
217 // 处理 ACCEPT 操作
218 if (ctx->type == IocpType::ACCEPT)
219 {
220 // 重新发起一个异步的 accept 操作,接收下一个连接
221 postAccpet(server);
222
223 // 将连接设为非阻塞
224 unsigned long value = 1;
225 if (ioctlsocket(ctx->sock, FIONBIO, &value) == SOCKET_ERROR)
226 {
227 fprintf(stderr, "ioctlsocket: %s\n", strerror(WSAGetLastError()));
228 closesocket(ctx->sock);
229 delete ctx;
230 continue;
231 }
232
233 // 将连接绑定到 IOCP 上
234 if (CreateIoCompletionPort(reinterpret_cast<HANDLE>(ctx->sock), handle, 0, 0) == nullptr)
235 {
236 fprintf(stderr, "CreateIoCompletionPort: %s\n", strerror(WSAGetLastError()));
237 closesocket(ctx->sock);
238 delete ctx;
239 continue;
240 }
241
242 // 发起一个异步的 send 操作,NOTE: AcceptEx 会读取第一帧数据
243 if (lpNumberOfBytesTransferred > 0)
244 postSend(ctx->sock, ctx->buffer, lpNumberOfBytesTransferred);
245
246 // 发起一个异步的 recv 操作,接受后续数据
247 if (postRecv(ctx->sock) == false)
248 {
249 closesocket(ctx->sock);
250 delete ctx;
251 continue;
252 }
253
254 delete ctx;
255 continue;
256 }
257
258 // 处理 RECV 操作
259 if (ctx->type == IocpType::RECV)
260 {
261 // 连接出错或断开
262 if (lpNumberOfBytesTransferred <= 0)
263 {
264 closesocket(ctx->sock);
265 delete ctx;
266 continue;
267 }
268
269 // 发起一个异步的 send 操作
270 postSend(ctx->sock, ctx->buffer, lpNumberOfBytesTransferred);
271
272 // 发起一个异步的 recv 操作,接收后续数据
273 if (postRecv(ctx->sock) == false)
274 {
275 closesocket(ctx->sock);
276 delete ctx;
277 continue;
278 }
279
280 delete ctx;
281 continue;
282 }
283
284 // 处理 SEND 操作
285 if (ctx->type == IocpType::SEND)
286 {
287 delete ctx;
288 continue;
289 }
290 }
291}使用 Apache HTTP server benchmarking tool 进行测试,结果如下:
1Server Software:
2Server Hostname: localhost
3Server Port: 8080
4
5Document Path: /
6Document Length: 0 bytes
7
8Concurrency Level: 10000
9Time taken for tests: 207.340 seconds
10Complete requests: 10000000
11Failed requests: 0
12Non-2xx responses: 10000000
13Keep-Alive requests: 10000000
14Total transferred: 1060000000 bytes
15HTML transferred: 0 bytes
16Requests per second: 48229.98 [#/sec] (mean)
17Time per request: 207.340 [ms] (mean)
18Time per request: 0.021 [ms] (mean, across all concurrent requests)
19Transfer rate: 4992.56 [Kbytes/sec] received
20
21Connection Times (ms)
22 min mean[+/-sd] median max
23Connect: 0 0 0.0 0 16
24Processing: 114 205 19.3 204 1214
25Waiting: 0 205 19.3 204 1214
26Total: 114 205 19.3 204 1214
27
28Percentage of the requests served within a certain time (ms)
29 50% 204
30 66% 205
31 75% 206
32 80% 206
33 90% 208
34 95% 210
35 98% 212
36 99% 213
37 100% 1214 (longest request)